#include <webx/menu.h>
#include <webx/route.h>
#include <http/HttpHelper.h>
#include <dbentity/T_XG_TIMER.h>

static thread_local int remotestatus = 0;
static HttpServer* app = HttpServer::Instance();

class TimerTask : public WorkItem
{
public:
	int delay;
	int status;
	string path;
	time_t rtime;
	time_t utime;
	string timeval;
	atomic_bool skip;

	bool uptime()
	{
		if (timeval.find(':') == string::npos)
		{
			rtime = utime;
		}
		else
		{
			rtime = DateTime::FromString(DateTime(utime).getDateString() + " " + timeval).getTime();
		}

		return true;
	}
	void process()
	{
		skip = stdx::async([this]{
			SmartBuffer data = webx::GetRemoteResult(this->path);

			if (data.str())
			{
				int status = 0;
				string code = JsonElement(data.str()).asString("code");

				if (code.empty())
				{
					int code = webx::GetLastRemoteStatus();

					if (code == 200)
					{
						status = XG_OK;
					}
					else if (code == 404)
					{
						status = XG_UNINSTALL;
					}
					else
					{
						status = XG_ERROR;
					}
				}
				else
				{
					status = stdx::atoi(code.c_str());

					if (status >= 0) status = XG_OK;
				}

				if (status)
				{
					this->utime = time(NULL);
					this->save(status);
				}
			}

			this->skip = false;
		});
	}
	bool runnable()
	{
		if (skip) return false;

		if (status == XG_UNINSTALL) return true;

		string val;
		time_t now = time(NULL);

		Check(path, false, val);

		if (val.empty())
		{
			save(XG_UNINSTALL);

			return true;
		}

		if (val == timeval)
		{
			if (rtime + delay > now) return false;

			utime = now;

			save();
			uptime();
			process();
		}
		else
		{
			if (update(val))
			{
				LogTrace(eINF, "update timertask[%s][%s] success", path.c_str(), val.c_str());
			}
			else
			{
				LogTrace(eERR, "update timertask[%s][%s] failed", path.c_str(), val.c_str());
			}
		}

		return false;
	}
	bool save(int status = 0)
	{
		CT_XG_TIMER tab;

		try
		{
			tab.init(webx::GetDBConnect());
		}
		catch(Exception e)
		{
			return false;
		}

		tab.path = path;
		tab.status = status;
		tab.timeval = timeval;
		tab.statetime = DateTime(utime);

		if ((this->status = status) == XG_UNINSTALL) return Check(path, true) && tab.remove() > 0;

		if (status = tab.update()) return status > 0;

		return tab.insert() > 0;
	}
	bool update(const string& timeval)
	{
		CT_XG_TIMER tab;

		try
		{
			tab.init(webx::GetDBConnect());
		}
		catch(Exception e)
		{
			return false;
		}

		tab.path = path;

		int delay = timeval.find(':') == string::npos ? stdx::atoi(timeval.c_str()) : 24 * 3600;

		if (tab.find() && tab.next())
		{
			utime = tab.statetime.val().getTime();

			tab.timeval = timeval;
			
			if (tab.update() < 0) return false;

			if (timeval.find(':') != string::npos)
			{
				string now = tab.statetime.toString();
				string date = DateTime(time(NULL)).getDateString();
	
				if (now.find(date) == 0 && timeval > now.substr(11)) utime -= delay;
			}
		}
		else
		{
			utime = time(NULL) - delay;

			tab.statetime = DateTime(utime);
			tab.timeval = timeval;
			tab.status = 0;

			if (tab.insert() < 0) return false;
		}

		this->timeval = timeval;
		this->delay = delay;

		return uptime();
	}
	bool init(const string& path, const string& timeval)
	{
		this->path = path;
		this->skip = false;

		return update(timeval);
	}

	static void Clear(sp<DBConnect> dbconn)
	{
		dbconn->execute("DELETE FROM T_XG_TIMER WHERE STATETIME<?", DateTime(time(NULL) - 3 * 24 * 3600));
	}
	static bool Check(const string& path, bool clear)
	{
		string timeval;

		return Check(path, clear, timeval);
	}
	static bool Check(const string& path, bool clear, string& timeval)
	{
		static map<string, string> pathmap;
		static SpinMutex mtx;
		SpinLocker lk(mtx);

		if (path.front() == '{' && path.back() == '}')
		{
			vector<string> vec;

			for (const auto& item : pathmap)
			{
				if (path.find("{" + item.first + "}") == string::npos)
				{
					vec.push_back(item.first);
				}
			}

			if (vec.size() > 0)
			{
				CT_XG_TIMER tab;

				try
				{
					tab.init(webx::GetDBConnect());
				}
				catch(Exception e)
				{
					return false;
				}

				for (const string& path : vec)
				{
					pathmap.erase(path);
					tab.path = path;
					tab.remove();
				}
			}

			return true;
		}

		if (clear)
		{
			pathmap.erase(path);

			return true;
		}

		string& val = pathmap[path];

		if (val.empty())
		{
			val = timeval;

			return true;
		}

		if (timeval.empty())
		{
			timeval = val;
		}
		else
		{
			val = timeval;
		}

		return false;
	}
};

class RouteConfig
{
protected:
	int port;
	bool route;
	string host;
	string version;
	mutable SpinMutex mtx;
	map<string, vector<HostItem>> hostmap;

public:
	static RouteConfig* Instance()
	{
		static RouteConfig cfg;

		return &cfg;
	}

public:
	void init()
	{
		if (host.empty())
		{
			ConfigFile* cfg = app->getConfigFile();

			cfg->getVariable("ROUTE_HOST", host);
			cfg->getVariable("ROUTE_PORT", port);

			route = port > 0 && host.length() > 0;
		}
	}
	RouteConfig()
	{
		route = false;
		port = 0;
	}
	HostItem getHost() const
	{
		SpinLocker lk(mtx);

		return HostItem(host, port);
	}
	HostItem get(const string& path) const
	{
		HostItem item;
		SpinLocker lk(mtx);
		const auto it = hostmap.find(CgiMapData::GetKey(path));

		if (it == hostmap.end()) return item;

		const vector<HostItem>& vec = it->second;

		if (vec.empty()) return item;

		return vec[abs(rand()) % vec.size()];
	}
	vector<HostItem> getList(const string& path) const
	{
		SpinLocker lk(mtx);
		const auto it = hostmap.find(CgiMapData::GetKey(path));

		if (it == hostmap.end()) return vector<HostItem>();

		return it->second;
	}
	bool update(const string& host, int port)
	{
		CHECK_FALSE_RETURN(host.length() > 0 && port > 0);

		JsonElement json;
		HttpRequest request("exportroute");

		request.setParameter("version", version);
		request.setParameter("access", CGI_PROTECT);

		if (route)
		{
			request.setParameter("clientid", app->getId());
			request.setParameter("clientname", app->getName());
			request.setParameter("clientport", app->getPort());
			request.setParameter("clienthost", app->getHost());
		}

		string version;
		string pathlist;
		SmartBuffer buffer = request.getResult(host, port);

		CHECK_FALSE_RETURN(buffer.str() && json.init(buffer.str()));

		version = json.asString("version");

		if (version == this->version)
		{
			SpinLocker lk(mtx);

			this->host = host;
			this->port = port;

			return true;
		}

		json = json.get("list");

		CHECK_FALSE_RETURN(json.isArray());

		map<string, vector<HostItem>> hostmap;

		for (JsonElement item : json)
		{
			int weight;
			HostItem host;
			JsonElement hostlist = item["list"];
			string path = item["path"].asString();
			vector<HostItem>& vec = hostmap[path];

			pathlist += "{" + path + "}";

			for (JsonElement data : hostlist)
			{
				weight = data["weight"].asInteger();
				host.host = data["host"].asString();
				host.port = data["port"].asInteger();

				if (weight < 1) weight = 1;
				if (weight > 9) weight = 9;

				for (int i = 0; i < weight; i++) vec.push_back(host);
			}
		}

		if (pathlist.length() > 0) TimerTask::Check(pathlist, false, pathlist);

		mtx.lock();

		std::swap(this->hostmap, hostmap);
		this->version = version;
		this->host = host;
		this->port = port;

		mtx.unlock();
		
		return true;
	}
};

class NotifyItem : public WorkItem
{
public:
	int port;
	string host;
	string path;
	string param;
	string cookie;
	string contype;

public:
	void run()
	{
		HttpRequest request(path);

		request.setDataString(param);

		if (cookie.length() > 0) request.setCookie(cookie);
		if (contype.length() > 0) request.setContentType(contype);

		if (request.getResponse(host, port))
		{
			LogTrace(eINF, "notify[%s:%d][%s] success", host.c_str(), port, path.c_str());
		}
		else
		{
			LogTrace(eERR, "notify[%s:%d][%s] failed", host.c_str(), port, path.c_str());
		}
	}
};

class PingWorkItem : public WorkItem
{
public:
	int port;
	int errcnt;
	int maxcnt;
	string host;

public:
	PingWorkItem()
	{
		errcnt = 0;
		maxcnt = 3;
	}
	bool runnable()
	{
		if (process() >= 0) return true;

		++errcnt;

		return false;
	}
	int process()
	{
		HttpRequest request("getcgilist");

		request.setParameter("access", CGI_PROTECT);
		request.setParameter("routehost", app->getHost());
		request.setParameter("routeport", app->getPort());

		if (RedisConnect::CanUse())
		{
			sp<RedisConnect> redis = RedisConnect::Instance();

			if (redis)
			{
				request.setParameter("redishost", redis->getHost());
				request.setParameter("redisport", redis->getPort());
				request.setParameter("redispasswd", redis->getPassword());
			}
		}

		long us;
		Timer tr(NULL);
		string content;
		JsonElement json;
		SmartBuffer buffer = request.getResult(host, port);

		us = tr.getTimeGap();

		if (buffer.str() && json.init(buffer.str()) && json["list"].isArray())
		{
			LogTrace(eINF, "ping host[%s:%d] success[%ld]", host.c_str(), port, us);

			content = buffer.str();
		}
		else
		{
			LogTrace(eERR, "ping host[%s:%d] failed", host.c_str(), port);

			if (errcnt < maxcnt) return XG_NETERR;
		}

		while (maxcnt-- > 0)
		{
			string sqlcmd;
			sp<DBConnect> dbconn;

			try
			{
				dbconn = webx::GetDBConnect();
			}
			catch(Exception e)
			{
				Sleep(100);

				continue;
			}

			if (content.empty())
			{
				stdx::format(sqlcmd, "UPDATE T_XG_ROUTE SET PROCTIME=0 WHERE HOST='%s' AND PORT=%d", host.c_str(), port);

				return dbconn->execute(sqlcmd) < 0 ? XG_FAIL : XG_OK;
			}
			else
			{
				JsonElement item;
				JsonElement json(content);
				JsonElement list = json.get("list");

				for (int i = list.size() - 1; i >= 0; i--)
				{
					item = list.get(i);

					string extdata = item.asString("extdata");

					if (extdata.length() > 0)
					{
						HttpDataNode param;

						param.parse(extdata);

						string path = item.asString("path");
						string delay = param.getValue("timertask");
						string daily = param.getValue("dailytask");

						auto checkTimer = [](const string& path, const string& timeval){
							string val = timeval;

							if (TimerTask::Check(path, false, val))
							{
								sp<TimerTask> timer = newsp<TimerTask>();

								if (timer->init(path, val) && stdx::async(timer))
								{
									LogTrace(eINF, "create timertask[%s][%s] success", path.c_str(), val.c_str());
								}
								else
								{
									LogTrace(eINF, "create timertask[%s][%s] failed", path.c_str(), val.c_str());

									TimerTask::Check(path, true);
								}
							}
						};

						if (delay.length() > 0)
						{
							int val = stdx::atoi(delay.c_str());

							if (val > 0)
							{
								checkTimer(path, stdx::str(val));
							}
							else
							{
								LogTrace(eINF, "invalid timertask[%s][%s]", path.c_str(), delay.c_str());
							}
						}

						if (daily.length() > 0)
						{
							DateTime dt = DateTime::FromString(DateTime::ToString().substr(0, 11) + daily);

							if (dt.canUse())
							{
								checkTimer(path, dt.getTimeString());
							}
							else
							{
								LogTrace(eINF, "invalid dailytask[%s][%s]", path.c_str(), daily.c_str());
							}
						}
					}
				}

				sqlcmd = "UPDATE T_XG_ROUTE SET PROCTIME=?,CONTENT=?,STATETIME=? WHERE HOST=? AND PORT=?";

				return dbconn->execute(sqlcmd, us, content, DateTime().update(), host, port) < 0 ? XG_FAIL : XG_OK;
			}
		}

		return XG_FAIL;
	}
};

class PingWorkThread : public Thread
{
public:
	void run()
	{
		int index = 0;
		string sqlcmd = "SELECT HOST,PORT FROM T_XG_ROUTE WHERE ENABLED>0 GROUP BY HOST,PORT";

		while (app->isActive())
		{
			if (app->getRouteSwitch())
			{
				sp<RowData> row;
				sp<QueryResult> rs;
				sp<DBConnect> dbconn;
				vector<sp<WorkItem>> vec;

				try
				{
					dbconn = webx::GetDBConnect();
				}
				catch(Exception e)
				{
					Sleep(100);

					continue;
				}

				if (rs = dbconn->query(sqlcmd))
				{
					LogTrace(eINF, "start route ping process success");

					while (row = rs->next())
					{
						sp<PingWorkItem> item = newsp<PingWorkItem>();
						
						item->host = row->getString(0);
						item->port = row->getInt(1);
						vec.push_back(item);
					}
				}

				if (index % 100 == 0) TimerTask::Clear(dbconn);

				dbconn = NULL;

				for (auto item : vec)
				{
					stdx::async(item);

					Sleep(100);
				}

				sp<Session> session = webx::GetLocaleSession("SYSTEM_ROUTELIST");

				if (session) session->clear();
			}

			updateRouteList();

			Sleep(5000);

			++index;
		}
	}
	void updateRouteList()
	{
		HostItem item = RouteConfig::Instance()->getHost();

		if (item.canUse())
		{
			if (RouteConfig::Instance()->update(item.host, item.port))
			{
				LogTrace(eINF, "update route list success");
			}
			else
			{
				LogTrace(eERR, "update route list failed");
			}
		}
	}
};

static int NotifyHost(const char* host, int port, const char* path, const char* param, const char* contype, const char* cookie)
{
	sp<NotifyItem> item = newsp<NotifyItem>();

	item->port = port;
	item->host = host;
	item->path = path;

	if (param) item->param = param;
	if (cookie) item->cookie = cookie;
	if (contype) item->contype = contype;

	return stdx::async(item) ? XG_OK : XG_SYSBUSY;
}

static int BroadcastHost(const char* path, const char* param, const char* contype, const char* cookie)
{
	int num = 0;
	set<string> hostset;
	vector<HostItem> vec = RouteConfig::Instance()->getList(path);

	for (const HostItem& item : vec)
	{
		if (hostset.insert(item.toString()).second)
		{
			int res = NotifyHost(item.host.c_str(), item.port, path, param, contype, cookie);

			if (res < 0) return res;

			++num;
		}
	}

	return num;
}

static int GetRegCenterHost(char* host, int* port)
{
	HostItem item = RouteConfig::Instance()->getHost();

	if (item.host.empty()) return XG_NOTFOUND;

	strcpy(host, item.host.c_str());
	*port = item.port;

	return XG_OK;
}

static int UpdateRouteList(const char* host, int port)
{
	return RouteConfig::Instance()->update(host, port) ? XG_OK : XG_SYSERR;
}

static int GetRouteHost(const char* path, char* host, int* port)
{
	HostItem item = RouteConfig::Instance()->get(path);

	if (item.host.empty()) return XG_NOTFOUND;

	strcpy(host, item.host.c_str());
	*port = item.port;

	return XG_OK;
}

static int GetLastRemoteStatus()
{
	return remotestatus;
}

static SmartBuffer GetRemoteResult(const char* path, const char* param, const char* contype, const char* cookie)
{
	int port;
	char host[64];
	HttpRequest request(path);
	sp<HttpResponse> response;

	if (param) request.setDataString(param);
	if (cookie && *cookie) request.setCookie(cookie);
	if (contype && *contype) request.setContentType(contype);

	if (GetRouteHost(request.getPath().c_str(), host, &port) < 0)
	{
		response = app->getLocaleResult(request);
	}
	else
	{
		sp<Socket> sock = SocketPool::Connect(host, port);

		if (!sock)
		{
			LogTrace(eERR, "request remote[%s] failed", path);

			remotestatus = XG_SENDFAIL;

			return SmartBuffer();
		}

		response = request.getResponse(sock);
	}

	if (!response)
	{
		LogTrace(eERR, "request remote[%s] failed", path);

		remotestatus = XG_RECVFAIL;

		return SmartBuffer();
	}

	remotestatus = response->getErrorCode();

	return response->getResult();
}

HTTP_PLUGIN_INIT({
	Process::SetObject("HTTP_NOTIFY_HOST_FUNC", (void*)NotifyHost);
	Process::SetObject("HTTP_GET_ROUTE_HOST_FUNC", (void*)GetRouteHost);
	Process::SetObject("HTTP_BROADCAST_HOST_FUNC", (void*)BroadcastHost);
	Process::SetObject("HTTP_GET_REMOTE_RESULT_FUNC", (void*)GetRemoteResult);
	Process::SetObject("HTTP_UPDATE_ROUTE_LIST_FUNC", (void*)UpdateRouteList);
	Process::SetObject("HTTP_GET_REG_CENTER_HOST_FUNC", (void*)GetRegCenterHost);
	Process::SetObject("HTTP_GET_LAST_REMOTE_STATUS_FUNC", (void*)GetLastRemoteStatus);

	static PingWorkThread thread;

	thread.start();

	RouteConfig::Instance()->init();

	return XG_OK;
})